14 重试机制是网络操作的基本保证

在真实的微服务系统中, ZooKeeper、etcd 等服务发现组件一般会独立部署成一个集群,业务服务通过网络连接这些服务发现节点,完成注册和订阅操作。但即使是机房内部的稳定网络,也无法保证两个节点之间的请求一定成功,因此 Dubbo 这类 RPC 框架在稳定性和容错性方面,就受到了比较大的挑战。为了保证服务的可靠性,重试机制就变得必不可少了

所谓的 “重试机制”就是在请求失败时,客户端重新发起一个一模一样的请求,尝试调用相同或不同的服务端,完成相应的业务操作。能够使用重试机制的业务接口得是“幂等”的,也就是无论请求发送多少次,得到的结果都是一样的,例如查询操作。

核心设计

在上一课时中,我们介绍了 AbstractRegistry 中的 register()/unregister()、subscribe()/unsubscribe() 以及 notify() 等核心操作,详细分析了通过本地缓存实现的容错功能。其实,这几个核心方法同样也是重试机制的关注点。

dubbo-registry 将重试机制的相关实现放到了 AbstractRegistry 的子类—— FailbackRegistry 中。如下图所示,接入 ZooKeeper、etcd 等开源服务发现组件的 Registry 实现,都继承了 FailbackRegistry,也就都拥有了失败重试的能力。

Registry继承关系.png

FailbackRegistry 设计核心是:覆盖了 AbstractRegistry 中 register()/unregister()、subscribe()/unsubscribe() 以及 notify() 这五个核心方法,结合前面介绍的时间轮,实现失败重试的能力;真正与服务发现组件的交互能力则是放到了 doRegister()/doUnregister()、doSubscribe()/doUnsubscribe() 以及 doNotify() 这五个抽象方法中,由具体子类实现。这是典型的模板方法模式的应用。

核心字段介绍

分析一个实现类的第一步就是了解其核心字段,那 FailbackRegistry 的核心字段有哪些呢?

  • retryTimer(HashedWheelTimer 类型):用于定时执行失败重试操作的时间轮。
  • retryPeriod(int 类型):重试操作的时间间隔。
  • failedRegistered(ConcurrentMap类型):注册失败的 URL 集合,其中 Key 是注册失败的 URL,Value 是对应的重试任务。
  • failedUnregistered(ConcurrentMap类型):取消注册失败的 URL 集合,其中 Key 是取消注册失败的 URL,Value 是对应的重试任务。
  • failedSubscribed(ConcurrentMap类型):订阅失败 URL 集合,其中 Key 是订阅失败的 URL + Listener 集合,Value 是相应的重试任务。
  • failedUnsubscribed(ConcurrentMap类型):取消订阅失败的 URL 集合,其中 Key 是取消订阅失败的 URL + Listener 集合,Value 是相应的重试任务。
  • failedNotified(ConcurrentMap类型):通知失败的 URL 集合,其中 Key 是通知失败的 URL + Listener 集合,Value 是相应的重试任务。

在 FailbackRegistry 的构造方法中,首先会调用父类 AbstractRegistry 的构造方法完成本地缓存相关的初始化操作,然后从传入的 URL 参数中获取重试操作的时间间隔(即retry.period 参数)来初始化 retryPeriod 字段,最后初始化 retryTimer****时间轮。整个代码比较简单,这里就不展示了。

核心方法实现分析

FailbackRegistry 对 register()/unregister() 方法和 subscribe()/unsubscribe() 方法的具体实现非常类似,所以这里我们就只介绍其中register() 方法的具体实现流程。

  1. 根据 registryUrl 中 accepts 参数指定的匹配模式,决定是否接受当前要注册的 Provider URL。
  2. 调用父类 AbstractRegistry 的 register() 方法,将 Provider URL 写入 registered 集合中。
  3. 调用 removeFailedRegistered() 方法和 removeFailedUnregistered() 方法,将该 Provider URL 从 failedRegistered 集合和 failedUnregistered 集合中删除,并停止相关的重试任务。
  4. 调用 doRegister() 方法,与服务发现组件进行交互。该方法由子类实现,每个子类只负责接入一个特定的服务发现组件。
  5. 在 doRegister() 方法出现异常的时候,会根据 URL 参数以及异常的类型,进行分类处理:待注册 URL 的 check 参数为 true(默认值为 true);待注册的 URL 不是 consumer 协议;registryUrl 的 check 参数也为 true(默认值为 true)。若满足这三个条件或者抛出的异常为 SkipFailbackWrapperException,则直接抛出异常。否则,就会创建重试任务并添加到 failedRegistered 集合中。

明确 register() 方法的核心流程之后,我们再来看 register() 方法的具体代码实现:

public void register(URL url) {

    if (!acceptable(url)) { 

        logger.info("..."); // 打印相关的提示日志

        return;

    }

    super.register(url); // 完成本地文件缓存的初始化

    // 清理failedRegistered集合和failedUnregistered集合,并取消相关任务

    removeFailedRegistered(url); 

    removeFailedUnregistered(url);

    try {

        doRegister(url);  // 与服务发现组件进行交互,具体由子类实现

    } catch (Exception e) {

        Throwable t = e;

        // 检测check参数,决定是否直接抛出异常

        boolean check = getUrl().getParameter(Constants.CHECK_KEY,

               true) && url.getParameter(Constants.CHECK_KEY, true)

                && !CONSUMER_PROTOCOL.equals(url.getProtocol());

        boolean skipFailback = t instanceof 

            SkipFailbackWrapperException;

        if (check || skipFailback) {

            if (skipFailback) {

                t = t.getCause();

            }

            throw new IllegalStateException("Failed to register");

        }

        // 如果不抛出异常,则创建失败重试的任务,并添加到failedRegistered集合中

        addFailedRegistered(url);

    }

}

从以上代码可以看出,当 Provider 向 Registry 注册 URL 的时候,如果注册失败,且未设置 check 属性,则创建一个定时任务,添加到时间轮中。

下面我们再来看看创建并添加这个重试任务的相关方法——addFailedRegistered() 方法,具体实现如下:

private void addFailedRegistered(URL url) {

    FailedRegisteredTask oldOne = failedRegistered.get(url);

    if (oldOne != null) { // 已经存在重试任务,则无须创建,直接返回

        return;

    }

    FailedRegisteredTask newTask = new FailedRegisteredTask(url, 

        this);

    oldOne = failedRegistered.putIfAbsent(url, newTask); 

    if (oldOne == null) {

        // 如果是新建的重试任务,则提交到时间轮中,等待retryPeriod毫秒后执行

        retryTimer.newTimeout(newTask, retryPeriod, 

          TimeUnit.MILLISECONDS);

    }

}

重试任务

FailbackRegistry.addFailedRegistered() 方法中创建的 FailedRegisteredTask 任务以及其他的重试任务,都继承了 AbstractRetryTask 抽象类,如下图所示:

重试任务.png

在 AbstractRetryTask 中维护了当前任务关联的 URL、当前重试的次数等信息,在其 run() 方法中,会根据重试 URL 中指定的重试次数(retry.times 参数,默认值为 3)、任务是否被取消以及时间轮的状态,决定此次任务的 doRetry() 方法是否正常执行。

public void run(Timeout timeout) throws Exception {

    if (timeout.isCancelled() || timeout.timer().isStop() || isCancel()) { // 检测定时任务状态和时间轮状态

        return;

    }

    if (times > retryTimes) { // 检查重试次数

        logger.warn("...");

        return;

    }

    try {

        doRetry(url, registry, timeout); // 执行重试

    } catch (Throwable t) { 

        reput(timeout, retryPeriod); // 重新添加定时任务,等待重试

    }

}

如果任务的 doRetry() 方法执行出现异常,AbstractRetryTask 会通过 reput() 方法将当前任务重新放入时间轮中,并递增当前任务的执行次数。

protected void reput(Timeout timeout, long tick) {

    if (timeout == null) { // 边界检查

        throw new IllegalArgumentException();

    }

    Timer timer = timeout.timer(); // 检查定时任务

    if (timer.isStop() || timeout.isCancelled() || isCancel()) {

        return;

    }

    times++; // 递增times

    // 添加定时任务

    timer.newTimeout(timeout.task(), tick, TimeUnit.MILLISECONDS);

}

AbstractRetryTask 将 doRetry() 方法作为抽象方法,留给子类实现具体的重试逻辑,这也是模板方法的使用。

在子类 FailedRegisteredTask 的 doRetry() 方法实现中,会再次执行关联 Registry 的 doRegister() 方法,完成与服务发现组件交互。如果注册成功,则会调用 removeFailedRegisteredTask() 方法将当前关联的 URL 以及当前重试任务从 failedRegistered 集合中删除。如果注册失败,则会抛出异常,执行上文介绍的 reput ()方法重试。

protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) {

    registry.doRegister(url); // 重新注册

    registry.removeFailedRegisteredTask(url); // 删除重试任务

}

public void removeFailedRegisteredTask(URL url) {

    failedRegistered.remove(url);

}

另外,在 register() 方法入口处,会主动调用 removeFailedRegistered() 方法和 removeFailedUnregistered() 方法来清理指定 URL 关联的定时任务:

public void register(URL url) {

    super.register(url);

    removeFailedRegistered(url); // 清理FailedRegisteredTask定时任务

    removeFailedUnregistered(url); // 清理FailedUnregisteredTask定时任务

    try {

        doRegister(url);

    } catch (Exception e) {

        addFailedRegistered(url);

    }

}

其他核心方法

unregister() 方法以及 unsubscribe() 方法的实现方式与 register() 方法类似,只是调用的 do*() 抽象方法、依赖的 AbstractRetryTask 有所不同而已,这里就不再展开细讲。

你还记得上一课时我们介绍的 AbstractRegistry 通过本地文件缓存实现的容错机制吗?FailbackRegistry.subscribe() 方法在处理异常的时候,会先获取缓存的订阅数据并调用 notify() 方法,如果没有缓存相应的订阅数据,才会检查 check 参数决定是否抛出异常。

通过上一课时对 AbstractRegistry.notify() 方法的介绍,我们知道其核心逻辑之一就是回调 NotifyListener。下面我们就来看一下 FailbackRegistry 对 notify() 方法的覆盖:

protected void notify(URL url, NotifyListener listener, 

        List<URL> urls) {

    ... // 检查url和listener不为空(略)

    try {

        // FailbackRegistry.doNotify()方法实际上就是调用父类

        // AbstractRegistry.notify()方法,没有其他逻辑

        doNotify(url, listener, urls); 

    } catch (Exception t) {

        // doNotify()方法出现异常,则会添加一个定时任务

        addFailedNotified(url, listener, urls);

    }

}

addFailedNotified() 方法会创建相应的 FailedNotifiedTask 任务,添加到 failedNotified 集合中,同时也会添加到时间轮中等待执行。如果已存在相应的 FailedNotifiedTask 重试任务,则会更新任务需要处理的 URL 集合。

在 FailedNotifiedTask 中维护了一个 URL 集合,用来记录当前任务一次运行需要通知的 URL,每执行完一次任务,就会清空该集合,具体实现如下:

protected void doRetry(URL url, FailbackRegistry registry, 

        Timeout timeout) {

    // 如果urls集合为空,则会通知所有Listener,该任务也就啥都不做了

    if (CollectionUtils.isNotEmpty(urls)) { 

        listener.notify(urls);

        urls.clear();

    }

    reput(timeout, retryPeriod); // 将任务重新添加到时间轮中等待执行

}

从上面的代码可以看出,FailedNotifiedTask 重试任务一旦被添加,就会一直运行下去,但真的是这样吗?在 FailbackRegistry 的 subscribe()、unsubscribe() 方法中,可以看到 removeFailedNotified() 方法的调用,这里就是清理 FailedNotifiedTask 任务的地方。我们以 FailbackRegistry.subscribe() 方法为例进行介绍:

public void subscribe(URL url, NotifyListener listener) {

    super.subscribe(url, listener);

    removeFailedSubscribed(url, listener); // 关注这个方法

    try {

        doSubscribe(url, listener);

    } catch (Exception e) {

        addFailedSubscribed(url, listener);

    }

}

// removeFailedSubscribed()方法中会清理FailedSubscribedTask、FailedUnsubscribedTask、FailedNotifiedTask三类定时任务

private void removeFailedSubscribed(URL url, NotifyListener listener) {

    Holder h = new Holder(url, listener); // 清理FailedSubscribedTask

    FailedSubscribedTask f = failedSubscribed.remove(h);

    if (f != null) {

        f.cancel();

    }

    removeFailedUnsubscribed(url, listener);// 清理FailedUnsubscribedTask

    removeFailedNotified(url, listener); // 清理FailedNotifiedTask

}

介绍完 FailbackRegistry 中最核心的注册/订阅实现之后,我们再来关注其实现的恢复功能,也就是 recover() 方法。该方法会直接通过 FailedRegisteredTask 任务处理 registered 集合中的全部 URL,通过 FailedSubscribedTask 任务处理 subscribed 集合中的 URL 以及关联的 NotifyListener。

FailbackRegistry 在生命周期结束时,会调用自身的 destroy() 方法,其中除了调用父类的 destroy() 方法之外,还会调用时间轮(即 retryTimer 字段)的 stop() 方法,释放时间轮相关的资源。

总结

本课时重点介绍了 AbstractRegistry 的实现类——FailbackRegistry 的核心实现,它主要是在 AbstractRegistry 的基础上,提供了重试机制。具体方法就是通过之前课时介绍的时间轮,在 register()/ unregister()、subscribe()/ unsubscribe() 等核心方法失败时,添加重试定时任务,实现重试机制,同时也添加了相应的定时任务清理逻辑。